package com.flink.examples.checkpoint;

import com.flink.examples.DataSource;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @Description Checkpoint使Flink的状态具有良好的容错性，通过checkpoint 机制，Flink可以在JOB发生错误时对作业的状态和计算位置进行恢复。
 * @Author JL
 * @Date 2021/10/29
 * @Version V1.0
 */
public class Checkpoint {

    /**
     * Checkpoint 在默认的情况下仅用于恢复失败的作业，并不保留，当程序取消时 checkpoint 就会被删除。
     * Checkpoint 由元数据文件、数据文件（与 state backend 相关）组成
     * 可通过配置文件中 “state.checkpoints.dir” 配置项来指定元数据文件和数据文件的存储路径，另外也可以在代码中针对单个作业特别指定该配置项。
     * 命令行恢复作业：bin/flink run -s :checkpointMetaDataPath [:runArgs] appJob.jar
     * Checkpoint机制参见官方：https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/checkpoints.html
     * Task故障恢复参见官方：https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/task_failure_recovery.html
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度为1
        env.setParallelism(1);
        //启用对数据流作业的检查快照。数据流的分布状态将被定期进行快照。执行Checkpointing的时间间隔为1分钟。
        env.enableCheckpointing(1000);
        //设置检查点数据存储的文件系统和位置。
        //flink支持的各种文件系统参见：https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/deployment/filesystems/overview/
        //hdfs://是以hadoop做为文件系统存放flink的checkpoint快照数据
        //env.setStateBackend(new FsStateBackend("hdfs://localhost:9000/flink/checkpoint"));
        //file:///是以主机本地文件目录存放flink的checkpoint快照数据(在该目录下会写入窗口数据，当发生错误job退出再次重后，会从最新快照恢复缓存的数据流重新丢入算子中计算)
        env.setStateBackend((StateBackend)new FsStateBackend("file:///test/checkpoint/"));

        /*
          固定间隔 (Fixed Delay) 重启策略:
          如果没有启用checkpointing，则使用无重启(no restart)策略。
          如果启用了checkpointing，但没有配置重启策略，则使用固定间隔(fixed-delay)策略，其中 Integer.MAX_VALUE 参数是尝试重启次数。
        */
        //配置固定间隔自动重启策略：失败后最多重启3次，每次重启等待10000ms(10秒)
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));

        /*
           失败率 (Failure Rate) 重启策略:
           当超过 failure rate (一个时间段内的失败次数) 时工作宣告失败。
           在连续两次重启尝试中，该重启策略会等待一端固定的时间。
         */
        //配置失败率自动重启策略：时间段内最多3次失败，5分钟内（时间段），每次重启等待10000ms(10秒)
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)));

        /*
            无重启 (No Restart) 策略：
            没有任何工作重启，若工作失败则直接宣告失败。
         */
        //配置无自动重启策略
        //env.setRestartStrategy(RestartStrategies.noRestart());

        //检查快照配置
        CheckpointConfig config = env.getCheckpointConfig();
        // 1分钟内完成快照生成，否则超时，超时取消本次快照检查点
        config.setCheckpointTimeout(60 * 1000);
        //【checkpoint最小间隔】单位：毫秒，设置检查点之间的最小暂停时间。此设置定义了要使用多久后在触发另一个检查点
        config.setMinPauseBetweenCheckpoints(500);
        //ExternalizedCheckpointCleanup 配置项定义了当作业取消时，对作业 checkpoint 的操作：
        //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION：当作业取消时，保留作业的 checkpoint。注意，这种情况下，需要手动清除该作业保留的 checkpoint。
        //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION：当作业取消时，删除作业的 checkpoint。仅当作业失败时，作业的 checkpoint 才会被保留。
        config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //设置检查点模式（exactly-once 或 at-least-once）。
        config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //设置可同时进行的最大检查点尝试次数。
        config.setMaxConcurrentCheckpoints(1);

        //测试数据集
        List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
        //流数据处理
        DataStream<String> dataStream = env.fromCollection(tuple3List).flatMap(new FlatMapFunction<Tuple3<String,String,Integer>, String>() {
            @Override
            public void flatMap(Tuple3<String, String, Integer> value, Collector<String> out) throws Exception {
                System.out.println(new Gson().toJson(value));
                if (!value.f1.equals("man")){
                    String msg = value.f0 + " sex is not 'man' , throw error!";
                    System.out.println(msg);
                    throw new RuntimeException(msg);
                }
                out.collect(value.f0);
            }
        });
        //打印
        dataStream.print().name("Checkpoint");
        //执行job
        env.execute("flink Checkpoint job");
    }

}
